﻿/**

 * Copyright (c) 2015-2016, FastDev 刘强 (fastdev@163.com) & Quincy.

 *

 * Licensed under the Apache License, Version 2.0 (the "License");

 * you may not use this file except in compliance with the License.

 * You may obtain a copy of the License at

 *

 *      http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using OF.Notify.Entity;
using OF.Notify.Master;
using OF.Notify.Channel;
using OF.Notify.DataHost;
using OF.DistributeService.Core.Common;
using ZooKeeperNet;
using OF.Notify.DataHost.Cluster;

namespace OF.Notify.Client
{
    public class Productor
    {
        internal INotifyService notifyService = null;
        internal string currentNode = null;

        internal ZooKeeperProxy zk = null;
        internal string ZookeeperConnection = "127.0.0.1:2181";
        internal DataNodeClusterWatch dataNodeClusterWatcher = new DataNodeClusterWatch();
        internal static int retryCount = 0;
        internal bool isDisposed = false;
        
        public static void Init(INotifyServiceProvider provider)
        {
            ClusterContext.SetNotifyServiceProvider(provider);
            retryCount = ClusterContext.Get().GetSendMessageRetryCount();
        }

        internal class DataNodeClusterWatch : IWatcher
        {
            internal ZooKeeperProxy zk;
            internal Productor productor;
            public void SetZk(ZooKeeperProxy zk, Productor productor)
            {
                this.zk = zk;
                this.productor = productor;
            }

            public void Process(WatchedEvent @event)
            {
                try
                {
                    if (@event.Path == null)
                    {
                        return;
                    }

                    if (@event.Type == EventType.NodeCreated || @event.Type == EventType.NodeDeleted
                        || @event.Type == EventType.NodeChildrenChanged)
                    {
                        if (productor != null)
                        {
                            if (!zk.IsDisposed())
                            {
                                productor.RefreshMasterNode();
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    Util.LogException("DataNodeClusterWatch", ex);
                    if (productor.isDisposed)
                    {
                        return;
                    }
                    throw;
                }
            }
        }

        public Productor()
        {
            zk = new ZooKeeperProxy(ZookeeperConnection, new TimeSpan(0, 0, 0, 0, 60000), new ZooKeeperSafeConnectWatcher(), null);
            dataNodeClusterWatcher.SetZk(zk, this);
            RefreshMasterNode();
        }

        public INotifyService GetNotifyService()
        {
            return notifyService;
        }

        internal void RefreshMasterNode()
        {
            string topicParent = NotifyConfig.TopicParent;
            if (zk.Exists(topicParent, dataNodeClusterWatcher) == null)
            {
                currentNode = null;
                notifyService = null;
                return;
            }
            var children = zk.GetChildren(topicParent, dataNodeClusterWatcher);
            List<string> childList = null;
            if (children == null || children.Count() == 0)
            {
                currentNode = null;
                notifyService = null;
                return;
            }
            else
            {
                string masterNode = children.OrderBy(child => int.Parse(child.Split('|')[2])).First();
                if (currentNode == masterNode)
                {
                    return;
                }
                else
                {
                    currentNode = masterNode;
                    DataNodeProxy proxy = DataNodeProxy.Parse(currentNode);
                    notifyService = ClusterContext.Get().GetNotifyServiceProvider().GetNotifyService(proxy.GetUrl());
                    //Util.LogInfo("matser client node is now:" + proxy.GetId() + "," + proxy.GetUrl());
                }
            }
        }

        public string GetCurrentNode()
        {
            return currentNode;
        }

        public bool SendMessage(TopicMessage topicMessage, Func<int, string[]> throwExceptionIdsFunc)
        {
            if (notifyService == null)
            {
                throw new Exception("No receive message service found.");
            }
            if (topicMessage.body.Length + 1 > UInt16.MaxValue)
            {
                throw new Exception("message length exceed max value:" + (UInt16.MaxValue - 1));
                return false;
            }

            DeliverMessageRequest request = new DeliverMessageRequest
            {
                topicMessage = topicMessage,
                throwExceptionCaseIds = null,
                topicEnum = topicMessage.topicEnum
            };
            CallServiceResult<bool> lastCallResult = null;
            if (Util.SafeLoopUtilTrue((loopI) =>
            {
                if (notifyService == null)
                {
                    Util.LogInfo("notifyService == null");
                    return false;
                }
                if (throwExceptionIdsFunc != null)
                {
                    request.throwExceptionCaseIds = throwExceptionIdsFunc(loopI);
                }
                lastCallResult = notifyService.DeliverMessage(request);
                return lastCallResult.IsSuccess();
            }, retryCount, 200))
            {
                return true;
            }
            else
            {
                Util.LogInfo("send message failed:" + lastCallResult.ErrorCode + "," + lastCallResult.ErrorMessage);
                return false;
            }
        }

        public void DoDispose()
        {
            isDisposed = true;
            if (zk != null)
            {
                zk.Dispose();
                zk = null;
            }
        }
    }

}
